-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataPipe] Add as_tuple argument for CSVParserIterDataPipe #646
[DataPipe] Add as_tuple argument for CSVParserIterDataPipe #646
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, LGTM with one nit comment
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@@ -68,6 +70,16 @@ def return_path(self, stream: Iterator[D], *, path: str) -> Iterator[Union[D, Tu | |||
for data in stream: | |||
yield path, data | |||
|
|||
def as_tuple(self, stream: Iterator[List]) -> Iterator[Union[List, Tuple]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realize we need to change the type hint here. The input stream
might not be iterator[List]
. We can use Iterator[D] -> Iterator[Union[D, Tuple]]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, done.
if isinstance(data, list): | ||
yield tuple(data) | ||
else: | ||
yield data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of performance, we might want to directly yield tuple(data)
without checking isinstance
every single time when as_tuple
is specified as True
. In that case, Users should take responsibility to handle the case.
Could you please run benchmarking using these two implementation with a long csv file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll do this benchmarking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ejguan, I run the following benchmark:
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
Got:
impl a: 13.500808166
impl b: 13.221415374999998
Any comments about this? ^_^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's run a dummy test for the case that not isinstance(data, list)
. To test it, you can expose as_tuple
to parse_csv_as_dict
and turn it on. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ejguan , I added a benchmark for parse_csv_as_dict
:
import csv
import os
import timeit
import tempfile
from torchdata.datapipes.iter import IterableWrapper, FileOpener
from torchdata.datapipes.iter.util.plain_text_reader import PlainTextReaderHelper
from torchdata.datapipes.iter.util.plain_text_reader import _CSVBaseParserIterDataPipe
def impl_a(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
if isinstance(data, list):
yield tuple(data)
else:
yield data
def impl_b(self, stream):
if not self._as_tuple:
yield from stream
return
for data in stream:
yield tuple(data)
fake_line = '1,2,3,4,5,6,7,8,9'
line_num = 100 * 10000
with tempfile.TemporaryDirectory() as temp_dir:
temp_fpath = os.path.join(temp_dir, 'temp.csv')
with open(temp_fpath, 'w') as f:
f.write('\n'.join([fake_line, ] * line_num))
datapipe1 = IterableWrapper([temp_fpath])
datapipe2 = FileOpener(datapipe1, mode="b")
csv_parser_dp = datapipe2.parse_csv(as_tuple=True)
PlainTextReaderHelper.as_tuple = impl_a
print("impl a: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_b
print("impl b: ", timeit.timeit(stmt=lambda: list(csv_parser_dp), number=10))
PlainTextReaderHelper.as_tuple = impl_a
csv_dict_parser_dp1 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=True)
csv_dict_parser_dp2 = _CSVBaseParserIterDataPipe(
datapipe2, csv.DictReader, decode=True, as_tuple=False)
print("impl a with dict as_tupe=True: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp1), number=10))
print("impl a with dict as_tupe=False: ", timeit.timeit(stmt=lambda: list(csv_dict_parser_dp2), number=10))
Got:
impl a: 13.746312875000001
impl b: 13.414058333999998
impl a with dict as_tupe=True: 25.277461583
impl a with dict as_tupe=False: 24.659475541
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about impl b
with dict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, there are the result of impl_b with dict parser:
impl a: 13.375169375
impl b: 13.102719625
impl a with dict as_tupe=True: 24.521925125000003
impl a with dict as_tupe=False: 23.991537542000003
impl b with dict as_tupe=True: 24.552794458999998
impl b with dict as_tupe=False: 24.026250667
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Then, let's leave it as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@@ -159,6 +159,11 @@ def make_path(fname): | |||
expected_res = [("1.csv", ["key", "item"]), ("1.csv", ["a", "1"]), ("1.csv", ["b", "2"]), ("empty2.csv", [])] | |||
self.assertEqual(expected_res, list(csv_parser_dp)) | |||
|
|||
# Functional Test: yield one row at time from each file as tuple instead of list, skipping over empty content | |||
csv_parser_dp = datapipe3.parse_csv(as_tuple=True) | |||
expected_res = [("key", "item"), ("a", "1"), ("b", "2"), ()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm. It weird that CI doesn't catch the Error. Could you please change the variable name from expected_res
to something else?
It breaks another test at line 171
Edit: Actually, it's raised by CI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I didn't notice that expected_res
has been reused in another test case. Fixed!
@@ -154,6 +168,7 @@ def __iter__(self) -> Iterator[Union[D, Tuple[str, D]]]: | |||
stream = self._helper.skip_lines(file) | |||
stream = self._helper.decode(stream) | |||
stream = self._csv_reader(stream, **self.fmtparams) | |||
stream = self._helper.as_tuple(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mypy
complains about it at this line. Could you please try to fix it? Otherwise, you can add a comment at the end of this line to bypass the mypy check with # type: ignore[assignment]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! mypy doesn't suggest variable name reuse in one function body.
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
1 similar comment
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Motivation
see pytorch/torcharrow#421
Changes
as_tuple
argument for CSVParserIterDataPipeas_tuple
in tests/test_local_io.py